MR 相关概念
- Job(作业) : 一个MR程序称为一个Job
- MRAppMaster(MR任务的主节点): 一个Job在运行时,会先启动一个进程,这个进程为 MRAppMaster。负责 Job 中执行状态的监控,容错,和 RM 申请资源,提交 Task 等。
- Task(任务): Task是一个进程,负责某项计算。
- Map(Map阶段): Map 是 MapReduce 程序运行的第一个阶段。Map阶段的目的是将输入的数据,进行切分。将一个大数据,切分为若干小部分。切分后,每个部分称为1片(split),每片数据会交给一个Task(进程)进行计算,负责 Map 阶段的 Task 称为 MapTask。在一个 MR 程序的 Map 阶段,会启动N(取决于切片数,多少个切片就会启动多少个 MapTask)个 MapTask。每个 MapTask 是并行运行。
- Reduce(Reduce阶段): Reduce 是MapReduce 程序运行的第二个阶段(最后一个阶段),Reduce 阶段的目的是将 Map 阶段,每个 MapTask 计算后的结果进行合并汇总,得到最终结果。Reduce阶段是可选的,不一定有。负责 Reduce 阶段的 Task 称为ReduceTask。一个Job可以通过设置,启动N个ReduceTask,这些ReduceTask也是并行运行,每个ReduceTask最终都会产生一个结果。
MR 相关组件
- Mapper: map 阶段核心的处理逻辑
- Reducer: reduce 阶段核心的处理逻辑
- InputFormat: 输入格式。MR 程序必须指定一个输入目录,一个输出目录,InputFormat 代表输入目录中文件的格式。如果是普通文件,可以使用FileInputFormat。如果是SequeceFile(hadoop提供的一种文件格式),可以使用 SequnceFileInputFormat,如果处理的数据在数据库中,需要使用 DBInputFormat。
- RecordReader: 记录读取器。RecordReader 负责从输入格式中,读取数据,读取后封装为一组记录(k-v)。
- OutPutFormat: 输出格式。OutPutFormat 代表 MR 处理后的结果,要以什么样的文件格式写出。将结果写出到一个普通文件中,可以使用 FileOutputFormat,将结果写出到数据库中,可以使用 DBOutPutFormat,将结果写出到 SequeceFil e中,可以使用 SequnceFileOutputFormat。
- RecordWriter: 记录写出器。将处理的结果以什么样的格式写出到输出文件中。
- Partitioner: 分区器。负责在 Mapper 将数据写出时,为每组 keyout-valueout 打上标记,进行分区。一个ReduceTask只会处理一个分区的数据。
MR 流程
- InputFormat 调用 RecordReader,从输入目录的文件中,读取一组数据,封装为 keyin-valuein 对象
- 将封装好的 key-value,交给 Mapper.map() ——>将处理的结果写出 keyout-valueout
- ReduceTask 启动 Reducer,使用 Reducer.reduce() 处理 Mapper写出的 keyout-valueout,
- OutPutFormat 调用 RecordWriter,将 Reducer 处理后的 keyout-valueout 写出到文件
Map阶段(MapTask): 切片(Split) —– 读取数据(Read) —– 交给Mapper处理(Map) —– 分区和排序(sort)
Reduce阶段(ReduceTask): 拷贝数据(copy) —– 排序(sort) —– 合并(reduce) —– 写出(write)
MR 编程
MR的编程只需要将自定义的组件和系统默认组件进行组合,组合之后运行即可,步骤:
- Map 阶段的核心处理逻辑需要编写在 Mapper 中
- Reduce 阶段的核心处理逻辑需要编写在 Reducer 中
- 将编写的 Mapper 和 Reducer 进行组合,组合成一个 Job
- 对 Job 进行设置,设置后运行
wordcount
InputFormat 的实现类很多
InputFormat 的作用:
- 验证输入目录中的文件格式,是否符合当前 Job 的要求
- 生成切片,每个切片都会交给一个 MapTask 处理
- 提供 RecordReader,由 RecordReader 从切片中读取记录,交给 Mapper 处理
InputFormat 中的 List<InputSplit> getSplits
方法的功能就是切片。ecordReader<K,V> createRecordReader
的功能是创建 RecordReader。默认 Hadoop 使用的是 TextInputFormat,而 TextInputFormat 创建的 RecordReader 是 LineRecordReader。所以 Hadoop 默认的 InputFormat 使用 TextInputFormat,默认是 Reader 使用 LineRecordReader。
本地模式
WCMapper 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package com.yanrs.mr.wordcount;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outKey = new Text(); private IntWritable outValue = new IntWritable(1);
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("keyin: " + key + " valuein: " + value); String[] words = value.toString().split("\t"); for (String word:words) { outKey.set(word); context.write(outKey, outValue); } } }
|
WCReducer 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.yanrs.mr.wordcount;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outValue = new IntWritable();
@Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value:values) { sum+=value.get(); }
outValue.set(sum); context.write(key, outValue); } }
|
WCDriver 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| package com.yanrs.mr.wordcount;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.net.URISyntaxException;
public class WCDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop10:9000"); FileSystem fileSystem = FileSystem.get(conf);
Path inputPath = new Path("/wcinput"); Path outPath = new Path("/mroutput"); if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); }
Job job = Job.getInstance(conf);
job.setJobName("wordcount");
job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true); } }
|
直接在 idea 中运行 WCDriver 的 main 方法即可。上面设置连接的是 Hadoop10 的文件系统,但是是在本地运行的。
代码地址
yarn 上运行
WCMapper 完整代码,同上
WCReducer 完整代码,同上
在 yarn 上运行,需要指定运行方式为 yarn,且指定 resourcemanager 的地址
1 2 3
| conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.hostname", "hadoop11");
|
还需要设置 job 所在的 jar 包
1 2 3
| job.setJarByClass(WCDriver.class);
|
将代码打包,上传到 hadoop 上,使用 hadoop jar 命令运行
1
| hadoop jar mapreduce-test-1.0-SNAPSHOT.jar com.yanrs.mr.wordcount.WCDriver
|
WCDriver 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package com.yanrs.mr.wordcount;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.net.URISyntaxException;
public class WCDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop10:9000"); conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.hostname", "hadoop11");
FileSystem fileSystem = FileSystem.get(conf);
Path inputPath = new Path("/wcinput"); Path outPath = new Path("/mroutput"); if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); }
Job job = Job.getInstance(conf);
job.setJarByClass(WCDriver.class);
job.setJobName("wordcount");
job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true); } }
|
代码地址
自定义 Bean
数据格式如上所示,需要统计每个手机消耗的上行,下行,总流量信息
FlowBeanMapper 代码如下,mapper 输入参数 key 为行号,value 为一行的文本。mapper 输出参数 key 手机号,value 为 bean 对象(对象中分别有上行,下行,总流量三个属性)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.yanrs.mr.flowbean;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ private Text outKey = new Text(); private FlowBean flowBean = new FlowBean();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\t");
outKey.set(words[1]); flowBean.setUpFlow(Long.parseLong(words[words.length - 3])); flowBean.setDownFlow(Long.parseLong(words[words.length - 2])); context.write(outKey, flowBean); } }
|
FlowBean 为实体类,有三个属性,需要实现 hadoop 的序列化方法。需要重写 write(称为序列化) 和 readFields(称为反序列化) 方法。并且反序列化和序列化的顺序要一致,并且提供属性的 get,set 方法,空参构造,toString 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| package com.yanrs.mr.flowbean;
import org.apache.hadoop.io.Writable;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow;
@Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); }
@Override public void readFields(DataInput dataInput) throws IOException { upFlow = dataInput.readLong(); downFlow = dataInput.readLong(); sumFlow = dataInput.readLong(); }
public long getUpFlow() { return upFlow; }
public void setUpFlow(long upFlow) { this.upFlow = upFlow; }
public long getDownFlow() { return downFlow; }
public void setDownFlow(long downFlow) { this.downFlow = downFlow; }
public long getSumFlow() { return sumFlow; }
public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; }
public FlowBean() { }
@Override public String toString() { return "FlowBean{" + "upFlow=" + upFlow + ", downFlow=" + downFlow + ", sumFlow=" + sumFlow + '}'; } }
|
FlowBeanReducer 处理 FlowBeanMapper 输出的数据,所以输入 key 和 value 的类型分别为 Text 和 FlowBean。输出也为 Text, FlowBean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.yanrs.mr.flowbean;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowBeanReducer extends Reducer <Text, FlowBean, Text, FlowBean>{
private FlowBean outValue = new FlowBean();
@Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sumUpFlow = 0; long sumDownFlow = 0;
for (FlowBean flowBean: values) { sumUpFlow += flowBean.getUpFlow(); sumDownFlow += flowBean.getDownFlow(); }
outValue.setDownFlow(sumDownFlow); outValue.setUpFlow(sumUpFlow); outValue.setSumFlow(sumDownFlow + sumUpFlow);
context.write(key, outValue); } }
|
FlowBeanDriver 中设置输入和输出目录,设置 MapperClass 和 ReducerClass。设置 Mapper,Reducer 的输出 key 和 value 类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| package com.yanrs.mr.flowbean;
import com.yanrs.mr.wordcount.WCMapper; import com.yanrs.mr.wordcount.WCReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.net.URISyntaxException;
public class FlowBeanDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
FileSystem fileSystem = FileSystem.get(conf);
Path inputPath = new Path("/mrinput/flowbean"); Path outPath = new Path("/mroutput/flowbean"); if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); }
Job job = Job.getInstance(conf);
job.setJobName("FlowBean");
job.setMapperClass(FlowBeanMapper.class); job.setReducerClass(FlowBeanReducer.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true); } }
|
因为没有配置在 yarn 上运行,所以直接 idea 运行即可。结果如下
1 2 3 4 5 6 7 8
| 13470253144 FlowBean{upFlow=180, downFlow=180, sumFlow=360} 13509468723 FlowBean{upFlow=7335, downFlow=110349, sumFlow=117684} 13560439638 FlowBean{upFlow=918, downFlow=4938, sumFlow=5856} 13568436656 FlowBean{upFlow=3597, downFlow=25635, sumFlow=29232} 13590439668 FlowBean{upFlow=1116, downFlow=954, sumFlow=2070} 13630577991 FlowBean{upFlow=6960, downFlow=690, sumFlow=7650} 13682846555 FlowBean{upFlow=1938, downFlow=2910, sumFlow=4848} ......
|
代码地址
默认的切片流程
片和块的关系
片:在计算MR程序时,才会切片。在运行程序时,临时将文件从逻辑上划分为若干部分(所以只是逻辑上的切片,并不是真正的切分),使用的输入格式不同(不同的 InputFormat),切片的方式不同,切片的数量也不同。每片的数据最终也是以块的形式存储在 HDFS。
块: 在向HDFS写文件时,文件中的内容以块为单位存储,块是实际的物理存在。
建议: 片大小最好等于块大小,将片大小设置和块大小一致,可以最大限度减少因为切片带来的磁盘IO和网络IO,MR计算框架速度慢的原因在于在执行MR时,会发生频繁的磁盘IO和网络IO。理论上来说:如果文件的数据量是一定的话,片越大,切片数量少,启动的 MapTask 少,Map 阶段运算慢,片越小,切片数量多,启动的MapTask多,Map阶段运算快。默认情况下片大小就是块大小,即文件的块大小默认为 128M,默认每片就是128M。MapTask的数量只取决于切片数,有多少切片就有多少个 MapTask
如果需要调节片大小 > 块大小:那么需要配置 mapreduce.input.fileinputformat.split.minsize
> 128M
如果需要调节片大小 < 块大小:那么需要配置 mapreduce.input.fileinputformat.split.maxsize
< 128M
- 获取当前输入目录中所有的文件
- 以文件为单位切片,如果文件为空文件,默认创建一个空的切片
- 如果文件不为空,尝试判断文件是否可切(不是压缩文件,都可切)
- 如果文件不可切,整个文件作为1片
- 如果文件可切,先获取片大小(默认等于块大小),循环判断 待切部分/ 片大小 > 1.1倍,如果大于先切去一片,再判断…
- 剩余部分整个作为1片
常见的输入格式
FileInputFormat 中有六个子类,下面总结一下常见的四个子类的切片策略和 RecordReader
TextInputFormat
TextInputFormat 常用于输入目录中全部是文本文件
切片策略: 默认的切片策略
RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为key,一行内容作为value,即 key 的类型为 LongWritable,value 的类型为 Text
上面的 wordcount 例子就是使用的默认的 TextInputFormat
切片策略: 以文件为单位,读取配置中 mapreduce.input.lineinputformat.linespermap
参数(默认为1),每次这么多行切为一片。
RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为key,一行内容作为value,即 key 的类型为 LongWritable,value 的类型为 Text
NLMapper 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package com.yanrs.mr.nline;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class NLMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outKey = new Text(); private IntWritable outValue = new IntWritable(1);
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("keyin: " + key + " valuein: " + value); String[] words = value.toString().split("\t"); for (String word:words) { outKey.set(word); context.write(outKey, outValue); } } }
|
NLReducer 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.yanrs.mr.nline;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class NLReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outValue = new IntWritable();
@Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value:values) { sum+=value.get(); }
outValue.set(sum); context.write(key, outValue); } }
|
NLDriver
完整代码。在 Driver
中新增设置使用 NLineInputFormat
。默认是一行切分为一片,如果需要设置可以在 conf 中设置 mapreduce.input.lineinputformat.linespermap
值即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package com.yanrs.mr.nline;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.net.URISyntaxException;
public class NLDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
FileSystem fileSystem = FileSystem.get(conf);
Path inputPath = new Path("/mrinput/nline"); Path outPath = new Path("/mroutput/nline"); if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); }
Job job = Job.getInstance(conf);
job.setInputFormatClass(NLineInputFormat.class);
job.setJobName("nline");
job.setMapperClass(NLMapper.class); job.setReducerClass(NLReducer.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true); } }
|
代码地址
KeyValueTextInputFormat
针对文本文件,使用分割字符,将每一行分割为 key 和 value,如果没有找到分隔符,当前行的内容作为 key,value 为空串。默认分隔符为 \t
,可以通过参数 mapreduce.input.keyvaluelinerecordreader.key.value.separator
指定。
切片策略:默认的切片策略
RecordReader : key 和 value 的类型都是 Text
KVMapper 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.yanrs.mr.keyvalue;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class KVMapper extends Mapper<Text, Text, Text, IntWritable> {
private IntWritable outValue = new IntWritable(1);
@Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key, outValue); } }
|
KVReducer 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.yanrs.mr.keyvalue;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class KVReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outValue = new IntWritable();
@Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value:values) { sum+=value.get(); }
outValue.set(sum); context.write(key, outValue); } }
|
KVDriver 完整代码如下,需要设置使用 KeyValueTextInputFormat,并且需要设置分隔符,需要注意的是分隔符只是一个 byte 类型的数据,即便传入的是一个字符串,也只会读取第一个字符。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package com.yanrs.mr.keyvalue;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.net.URISyntaxException;
public class KVDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop10:9000"); conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "*");
FileSystem fileSystem = FileSystem.get(conf);
Path inputPath = new Path("/mrinput/keyvalue"); Path outPath = new Path("/mroutput/keyvalue"); if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); }
Job job = Job.getInstance(conf);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setJobName("keyvalue");
job.setMapperClass(KVMapper.class); job.setReducerClass(KVReducer.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true); } }
|
代码地址
CombineTextInputFormat
改变了传统的切片方式。将多个小文件,划分到一个切片中,适合小文件过多的场景。
切片策略: 先确定片的最大值 maxSize,maxSize 通过参数 mapreduce.input.fileinputformat.split.maxsize
设置。流程是以文件为单位,将每个文件划分为若干 part,如果文件的待切部分的大小小于等于 maxSize, 则整个待切部分作为1个 part,如果文件的待切部分的大小大于 maxsize 但是小于等于 2 maxSize, 那么将整个待切部分均匀的切分为2个 part。如果文件的待切部分的大小大于 2 maxSize, 那么先切去 maxSize 大小,得到 1个 part,剩余待切部分继续判断
RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为 key,一行内容作为 value,即 key 的类型为 LongWritable,value 的类型为 Text
CMMapper 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package com.yanrs.mr.combine;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class CMMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outKey = new Text(); private IntWritable outValue = new IntWritable(1);
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("keyin: " + key + " valuein: " + value); String[] words = value.toString().split("\t"); for (String word:words) { outKey.set(word); context.write(outKey, outValue); } } }
|
CMReducer 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.yanrs.mr.combine;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CMReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outValue = new IntWritable();
@Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value:values) { sum+=value.get(); }
outValue.set(sum); context.write(key, outValue); } }
|
CMDriver 完整代码。需要设置多大文件切为一片,设置使用 CombineTextInputFormat
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| package com.yanrs.mr.combine;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.net.URISyntaxException;
public class CMDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop10:9000"); conf.set("mapreduce.input.fileinputformat.split.maxsize", "2048");
FileSystem fileSystem = FileSystem.get(conf);
Path inputPath = new Path("/mrinput/combine"); Path outPath = new Path("/mroutput/combine"); if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); }
Job job = Job.getInstance(conf);
job.setInputFormatClass(CombineTextInputFormat.class);
job.setJobName("combine");
job.setMapperClass(CMMapper.class); job.setReducerClass(CMReducer.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true); } }
|
代码地址
MR 核心阶段划分
MapTask 阶段
- map
- sort
RedcueTask 阶段
- copy
- sort
- reduce
shuffle 阶段
上面的 2-4 又称为 shuffle 阶段。Shuffle 阶段横跨 MapTask 和 RedcueTask,在MapTask端也有 Shuffle,在RedcueTask 也有 Shuffle。具体 Shuffle 阶段指 MapTask 的 map 方法运行之后到 RedcuceTask 的 reduce 方法运行之前。
总结
mapper 的输出,为 reducer 的输入,mapper 的输出由不同的 InputFormat 的 RecordReader 决定。
不同的 InputFormat 有着不同的切片策略,默认如果不设置,那么使用的是 TextInputFormat。
reduce 方法一次处理一组数据,key 相同的数据为一组。
mapper 和 reducer 的输出数据格式由自己根据需求来设置,可以是 hadoop 内置的类型,也可以自定义 bean。
如果要将编写好的程序在 yarn 上运行,那么需要配置 yarn 的地址,设置 job 所在的 jar 包,将程序打包为 jar 之后运行。